package gu.simplemq.redis;

import java.util.Collection;

import com.google.common.base.Function;

import gu.simplemq.Channel;
import gu.simplemq.IAdvisor;
import gu.simplemq.IPublisher;
import gu.simplemq.json.BaseJsonEncoder;
import gu.simplemq.utils.CommonUtils;
import redis.clients.jedis.Jedis;

/**
 * 
 * {@link IPublisher} redis实现
 * @author guyadong
 *
 */
public class RedisPublisher implements IPublisher,IRedisComponent,RedisConstants{
	private static final BaseJsonEncoder encoder = BaseJsonEncoder.getEncoder();
	/**
	 * 将对象序列化为JSON字符串的{@link Function} 实例
	 */
	private final static Function<Object, String> jsonFun = new Function<Object,String>(){
		@Override
		public String apply(Object input) {
			return encoder.toJsonString(input);
		}};
	private final JedisPoolLazy poolLazy;
	private final RedisConsumerAdvisor advisor;
	/**
	 * 将对象序列化为字符串的实例默序列化为JSON字符串
	 */
	private Function<Object, String> stringSerializer = jsonFun;
	@Override
	public JedisPoolLazy getPoolLazy() {
		return this.poolLazy;

	}

	public RedisPublisher(JedisPoolLazy poolLazy) {
		super();
		this.poolLazy = poolLazy;
		this.advisor = RedisConsumerAdvisor.topicAdvisorOf(poolLazy);
	}
	
	@Override
	public <T> long publish(Channel<T> channel, T obj) {
		if(null == obj){
			return 0L;
		}
		if(null != channel.type){
			// 检查发布的对象类型与频道数据类型是否匹配
			if(channel.type instanceof Class<?> && !((Class<?>)channel.type).isInstance(obj)){
				throw new IllegalArgumentException("invalid type of 'obj'");
			}
		}
		Jedis jedis = this.poolLazy.apply();
		try{
			Long numOfClient = jedis.publish(channel.name, this.stringSerializer.apply(obj));
			return null == numOfClient ? 0L : numOfClient.longValue();
		}finally{
			this.poolLazy.free();
		}
	}

	@Override
	public <T> void publish(Channel<T> channel, Collection<T> objects) {
		objects = CommonUtils.cleanNullAsList(objects);
		for(T obj:objects){
			publish(channel,obj);
		}
	}

	@Override
	public <T> void publish(Channel<T> channel, @SuppressWarnings("unchecked") T... objects) {
		publish(channel,CommonUtils.cleanNullAsList(objects));
	}
	
	@Override
	public IAdvisor getAdvisor() {
		return advisor;
	}

	@Override
	public IPublisher withStringSerializer(Function<Object, String> stringSerializer) {
		this.stringSerializer = null == stringSerializer ? jsonFun: stringSerializer;
		return this;
	}
}
